Update HomeserverTestCase.get_success(...) and friends to drive async Rust (Tokio runtime/thread pool)#19871
Update HomeserverTestCase.get_success(...) and friends to drive async Rust (Tokio runtime/thread pool)#19871MadLittleMods wants to merge 30 commits into
HomeserverTestCase.get_success(...) and friends to drive async Rust (Tokio runtime/thread pool)#19871Conversation
get_success(...) and friends to drive async Rust (Tokio runtime/thread pool)HomeserverTestCase.get_success(...) and friends to drive async Rust (Tokio runtime/thread pool)
| event.room_version, | ||
| ), | ||
| exc=LimitExceededError, | ||
| by=0.5, |
There was a problem hiding this comment.
In a lot of cases, the by usage didn't seem necessary at all (test still passes) (no need to advance time in the reactor/clock)
| # whole chain to completion. | ||
| self.reactor.pump([by] * 100) | ||
|
|
||
| def get_success(self, d: Awaitable[TV], by: float = 0.0) -> TV: |
There was a problem hiding this comment.
Removed the by arg as it encourages bad behavior (people use it as a hammer to advance time without reasoning to make things work) and we arbitrarily advance time 100x this amount (imprecise).
I've instead updated the few places that we use this with a precise self.reactor.advance(...) as necessary.
| sync_d = ensureDeferred( | ||
| worker_presence_handler.user_syncing( | ||
| self.user_id, self.device_id, True, PresenceState.ONLINE | ||
| ), | ||
| by=0.1, | ||
| ) | ||
| ) | ||
| # `user_syncing` proxies the presence write to the main process over an HTTP | ||
| # replication request. The request body is streamed by a `Cooperator` that uses | ||
| # the clock to schedule each chunk at a tiny *non-zero* delay (`_EPSILON`), so | ||
| # we need to actually advance the clock for it to fire. | ||
| self.reactor.advance(Duration(microseconds=1).as_secs()) | ||
| self.get_success(sync_d) |
There was a problem hiding this comment.
This is the main pattern I'm recommending if you need to advance time by an non-zero increment. ensureDeferred works well but the name is a bit non-obvious to describe that we want to make the task run in the background on its own.
run_in_background(...) would also work but it's usage is a bit awkward. I guess we could use run_coroutine_in_background(...) instead 🤔
The difference between ensureDeferred(...) vs run_in_background(...)/run_coroutine_in_background(...) is all of the extra LoggingContext (log context) handling. It doesn't matter for tests though.
…te_room_membership_resume_after_restart`
| # XXX: There can be a few already dispatched database queries (from normal | ||
| # background tasks in Synapse) and the threadless `ThreadPool` that we use in | ||
| # tests uses *untracked* clock calls to pass database results back so `shutdown` | ||
| # doesn't cancel those calls. This is a quirk of our test infrastructure | ||
| # (threadless `ThreadPool`) so this kind of "hack" is fine. | ||
| self.reactor.advance(0) |
There was a problem hiding this comment.
The explanation is slightly hand-wavey
…ocess_join_after_server_leaves_room` `wait_for_background_updates` is not relevant
| # Process the leave and join in one go. | ||
| dir_handler.update_user_directory = True | ||
| dir_handler.notify_new_event() | ||
| self.wait_for_background_updates() |
There was a problem hiding this comment.
As far as I can tell self.wait_for_background_updates() is totally bogus here. I assume the mistake here was because notify_new_event(...) uses run_as_background_process(...) but that's a totally separate thing (background updates != background process)
This made the test work because it does wait_for_background_updates(...) did a get_success(..., by=0.1) which pumped and advanced the reactor/clock.
But we can replace it with something more precise.
| # reactor to run (like `reactor.callFromThread(...)`) | ||
| self.reactor.advance(0) | ||
|
|
||
| def get_success( |
There was a problem hiding this comment.
The primary change of this PR is changing get_success(...)/get_failure(...)` to be able to make progress on any awaitable that needs to do async Rust work.
The rest is just adjusting things because we removed the by arg (see other discussion) and stopped calling pump(...).
| # FIXME: Remove as this has the exact same semantics as `get_success()`. In | ||
| # https://github.com/matrix-org/synapse/pull/8402#discussion_r495992506 where it was | ||
| # introduced, it was claimed that "get_success fails the test if the deferred fails | ||
| # rather than raising, which I find a bit unintuitive." but `get_success()` actually | ||
| # does raise "@raise SynchronousTestCase.failureException : If the | ||
| # L{Deferred<twisted.internet.defer.Deferred>} has no result or has a failure | ||
| # result." at-least in today's world. |
There was a problem hiding this comment.
I think this is accurate (follow-up PR)
| duration_ms = 10 | ||
| await self.clock.sleep(Duration(milliseconds=count * duration_ms)) |
There was a problem hiding this comment.
Instead of making the sleep duration dependent on the count (dynamic), I've just just made it static so we can be precise with our time advancements below
| for callbable, args, kwargs in triggers: | ||
| callbable(*args, **kwargs) | ||
|
|
||
| def till_deferred_has_result( |
There was a problem hiding this comment.
We can remove till_deferred_has_result because get_success(...) covers it on its own now
| # Checking `d.called` by itself is not sufficient by itself as this is possible: | ||
| # | ||
| # If you have a first `Deferred` `D1`, you can add a callback which returns | ||
| # another `Deferred` `D2`, and `D2` must then complete before any further | ||
| # callbacks on `D1` will execute (and later callbacks on `D1` get the *result* | ||
| # of `D2` rather than `D2` itself). | ||
| # | ||
| # So, `D1` might have `called=True` (as in, it has started running its | ||
| # callbacks), but any new callbacks added to `D1` won't get run until `D2` | ||
| # completes. Fortunately, we can detect this by checking `d.paused`. | ||
| while not d.called or d.paused: |
There was a problem hiding this comment.
This language is the same explanation given in f22e7cd
You can reproduce the problem with this test: SYNAPSE_TEST_LOG_LEVEL=INFO poetry run trial tests.storage.databases.main.test_events_worker.GetEventCancellationTestCase.test_first_get_event_cancelled
Update
HomeserverTestCase.get_success(...)and friends to drive async Rust (Tokio runtime/thread pool)Spawning from adding some more async Rust things in #19846 and noticing that we have an existing pattern to use instead of the custom
till_deferred_has_result(...)that has crept in to a few files.Alternative to #19867 spurred on by this comment from @erikjohnston
Does this slow down the entire test suite?
trial (3.10, sqlite, all)trial (3.10, postgres, 14, all)Dev notes
#19394 (comment) and #19734 (comment) discuss why you sometimes need to
self.reactor.advance(0)before you can actuallyself.reactor.advance(...)in some cases and reasoning for whypump(...)may have become a thing.Todo
till_deferred_has_resultwait_on_threadPull Request Checklist
EventStoretoEventWorkerStore.".code blocks.